package com.shujia.flink.sql

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object Demo10FIleSource {
  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      //      .inStreamingMode()
      .inBatchMode()//批处理模式，输出最终结果
      .build()
    /**
     * flink sql环境
     *
     */
    val table: TableEnvironment = TableEnvironment.create(settings)

    table.executeSql(
      """
        |
        |CREATE TABLE student_file (
        |    id STRINg,
        |    name STRING,
        |    age INT,
        |    gender STRING,
        |    clazz STRING
        |)  WITH (
        |  'connector' = 'filesystem',           -- 必选：指定连接器类型
        |  'path' = 'data/students.txt',  -- 必选：指定路径
        |  'format' = 'csv'                     -- 必选：文件系统连接器指定 format
        |)
        |""".stripMargin)

    table.executeSql(
      """
        |
        |CREATE TABLE print_table
        |(
        |    clazz STRING,
        |    num BIGINT
        |)
        |WITH ('connector' = 'print')
        |""".stripMargin)

    table.executeSql(
      """
        |
        |insert into print_table
        |select clazz,count(1) as num from
        |student_file
        |group by clazz
        |""".stripMargin)
  }

}
